#!/usr/bin/env bash
# shellcheck source=./mergetree_mutations.lib
. "$CURDIR"/mergetree_mutations.lib

function try_sync_replicas()
{
    table_name_prefix=$1
    time_left=$2

    readarray -t empty_partitions_arr < <(${CLICKHOUSE_CLIENT} -q \
        "SELECT DISTINCT substr(new_part_name, 1, position(new_part_name, '_') - 1) AS partition_id
        FROM system.replication_queue
        WHERE (database = currentDatabase()) AND (table LIKE '$table_name_prefix%') AND (last_exception LIKE '%No active replica has part%') AND (partition_id NOT IN (
            SELECT partition_id
            FROM system.parts
            WHERE (database = currentDatabase()) AND (table LIKE '$table_name_prefix%')
        ))")
    readarray -t tables_arr < <(${CLICKHOUSE_CLIENT} -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%' AND engine like '%Replicated%'")

    for t in "${tables_arr[@]}"
    do
        for p in "${empty_partitions_arr[@]}"
        do
            # Avoid "Empty part ... is not created instead of lost part because there are no parts in partition"
            $CLICKHOUSE_CLIENT -q "ALTER TABLE $t DROP PARTITION ID '$p'" 2>/dev/null
        done
    done

    for t in "${tables_arr[@]}"
    do
        # Do not start new merges (it can make SYNC a bit faster)
        $CLICKHOUSE_CLIENT -q "ALTER TABLE $t MODIFY SETTING max_replicated_merges_in_queue=0"
    done

    i=0
    for t in "${tables_arr[@]}"
    do
        $CLICKHOUSE_CLIENT --receive_timeout $time_left -q "SYSTEM SYNC REPLICA $t STRICT" || ($CLICKHOUSE_CLIENT -q \
            "select 'sync failed, queue:', * from system.replication_queue where database=currentDatabase() and table='$t' order by database, table, node_name" && exit 1) &
        pids[${i}]=$!
        i=$((i + 1))
    done
    for pid in "${pids[@]}"; do
        wait $pid || (echo "Failed to sync some replicas" && exit 1)
    done
    echo "Replication did not hang: synced all replicas of $table_name_prefix"
}

function check_replication_consistency()
{
    table_name_prefix=$1
    check_query_part=$2

    # Wait for all queries to finish (query may still be running if thread is killed by timeout)
    num_tries=0
    while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%'") -ne 1 ]]; do
        sleep 1;
        num_tries=$((num_tries+1))
        if [ $num_tries -eq 250 ]; then
            echo "Queries for $table_name_prefix did not finish automatically after 250+ seconds"
            echo "==================== QUERIES ===================="
            $CLICKHOUSE_CLIENT -q "SELECT * FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%' FORMAT Vertical"
            echo "==================== STACK  TRACES ===================="
            $CLICKHOUSE_CLIENT -q "SELECT query_id, thread_name, thread_id, arrayStringConcat(arrayMap(x -> demangle(addressToSymbol(x)), trace), '\n') FROM system.stack_trace where query_id IN (SELECT query_id FROM system.processes WHERE current_database=currentDatabase() AND query LIKE '%$table_name_prefix%') SETTINGS allow_introspection_functions=1 FORMAT Vertical"
            echo "==================== MUTATIONS ===================="
            $CLICKHOUSE_CLIENT -q "SELECT * FROM system.mutations WHERE database=currentDatabase() FORMAT Vertical"
            break
        fi
    done

    # Do not check anything if all replicas are readonly,
    # because is this case all replicas are probably lost (it may happen and it's not a bug)
    res=$($CLICKHOUSE_CLIENT -q "SELECT count() - sum(is_readonly) FROM system.replicas WHERE database=currentDatabase() AND table LIKE '$table_name_prefix%'")
    if [ $res -eq 0 ]; then
        # Print dummy lines
        echo "Replication did not hang: synced all replicas of $table_name_prefix"
        echo "Consistency: 1"
        return 0
    fi

    # Touch all data to check that it's readable (and trigger PartCheckThread if needed)
    while ! $CLICKHOUSE_CLIENT -q "SELECT * FROM merge(currentDatabase(), '$table_name_prefix') FORMAT Null" 2>/dev/null; do
        sleep 1;
        num_tries=$((num_tries+1))
        if [ $num_tries -eq 250 ]; then
            break
        fi
    done
    time_left=$((300 - num_tries))

    # Trigger pullLogsToQueue(...) and updateMutations(...) on some replica to make it pull all mutations, so it will be possible to kill them
    some_table=$($CLICKHOUSE_CLIENT -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%' ORDER BY rand() LIMIT 1")
    $CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA $some_table PULL" 1>/dev/null 2>/dev/null ||:
    some_table=$($CLICKHOUSE_CLIENT -q "SELECT name FROM system.tables WHERE database=currentDatabase() AND name like '$table_name_prefix%' ORDER BY rand() LIMIT 1")
    $CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA $some_table PULL" 1>/dev/null 2>/dev/null ||:

    # Forcefully cancel mutations to avoid waiting for them to finish
    ${CLICKHOUSE_CLIENT} -q "KILL MUTATION WHERE database=currentDatabase() AND table like '$table_name_prefix%'" > /dev/null

    # SYNC REPLICA is not enough if some MUTATE_PARTs are not assigned yet
    wait_for_all_mutations "$table_name_prefix%"

    try_sync_replicas "$table_name_prefix" "$time_left" || exit 1

    res=$($CLICKHOUSE_CLIENT -q \
    "SELECT
        if((countDistinct(data) as c) == 0, 1, c)
    FROM
    (
        SELECT _table, ($check_query_part) AS data
        FROM merge(currentDatabase(), '$table_name_prefix') GROUP BY _table
    )")

    echo "Consistency: $res"
    if [ $res -ne 1 ]; then
        echo "Replicas have diverged:"
        $CLICKHOUSE_CLIENT -q "select 'data', _table, $check_query_part, arraySort(groupArrayDistinct(_part)) from merge(currentDatabase(), '$table_name_prefix') group by _table order by _table"
        $CLICKHOUSE_CLIENT -q "select 'queue', * from system.replication_queue where database=currentDatabase() and table like '$table_name_prefix%' order by database, table, node_name"
        $CLICKHOUSE_CLIENT -q "select 'mutations', * from system.mutations where database=currentDatabase() and table like '$table_name_prefix%' order by database, table, mutation_id"
        $CLICKHOUSE_CLIENT -q "select 'parts', * from system.parts where database=currentDatabase() and table like '$table_name_prefix%' order by database, table, name"
        echo "Good luck with debugging..."
        exit 1
    fi

}

# vi: ft=bash
